package flinkstudy.batch;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

/**
 * 高级 api
 *
 * @author daocr
 * @date 2020/1/10
 */
public class AdvancedApi {

    ExecutionEnvironment env = null;

    @Before
    public void init() {
        env = ExecutionEnvironment.getExecutionEnvironment();
    }


    /**
     * 创建累加器
     *
     * @see java.util.function.IntConsumer
     * @see org.apache.flink.api.common.accumulators.LongCounter
     * @see java.util.function.DoubleConsumer
     * @see org.apache.flink.api.common.accumulators.Histogram
     * 平均值累加器
     * @see org.apache.flink.api.common.accumulators.AverageAccumulator
     * <p>
     * 自定义累加器
     * @see org.apache.flink.api.common.accumulators.SimpleAccumulator
     */
    @Test
    public void accumulator() throws Exception {

        DataSource<Tuple2<Integer, String>> dataSource = env.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));

        MapOperator<Tuple2<Integer, String>, Tuple2<Integer, String>> countMapOperator = dataSource.map(new RichMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {

            private AverageAccumulator averageAccumulator;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                // 注册累加器
                this.averageAccumulator = new AverageAccumulator();
                getRuntimeContext().addAccumulator("count", averageAccumulator);

            }

            @Override
            public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
                averageAccumulator.add(value.f0);
                return value;
            }
        });


        countMapOperator.writeAsText("./accumulatorCount", FileSystem.WriteMode.OVERWRITE);

        JobExecutionResult jobExecutionResult = env.execute("累加器 例子");

        Object count = jobExecutionResult.getAccumulatorResult("count");

        System.out.println("累加 结果：" + count);


    }
}
